package rabbitmq.publisher.confirms;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constants.Constants;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

public class PublisherConfirms {
    private static final Integer MESSAGE_COUNT = 200;
    static Connection createConnection() throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);//需要提前开放端口号
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        return factory.newConnection();

    }
    public static void main(String[] args) throws Exception {
        //策略1: 单独确认  单独确认策略, 消息条数: 200, 耗时: 7372 ms
//        publishingMessagesIndividually();
        //策略2: 批量确认  批量确认策略, 消息条数: 200, 耗时: 456 ms
//        publishingMessagesInBatches();
        //策略3: 异步确认  异步确认策略, 消息条数: 200, 耗时: 116 ms
//        handlingPublisherConfirmsAsynchronously();
    }

    /**
     * 异步确认
     *
     */
    private static void handlingPublisherConfirmsAsynchronously() throws Exception{
        try(Connection connection = createConnection()){
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列(使用内置交换机)
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3,true,false,false,null);
            long start = System.currentTimeMillis();
            //4. 监听confirm
            //集合中存放的是未确认的消息id2
            SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple){
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }

                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple){
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                    //可能需要进行重发操作,此处省略

                }
            });
            //5. 发送消息
            for (Integer i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3,null,msg.getBytes());
                confirmSeqNo.add(seqNo);

            }
            //消息可能没确认完
            while (!confirmSeqNo.isEmpty()){
                Thread.sleep(10);
            }
            long end = System.currentTimeMillis();
            System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT,end-start);



        }
    }

    /**
     * 批量确认
     */
    private static void publishingMessagesInBatches() throws Exception{
        try(Connection connection = createConnection()){
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列(使用内置交换机)
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2,true,false,false,null);
            long start = System.currentTimeMillis();
            //4. 发送消息并等待确认
            int batchSize = 100;
            int outstandingMessageCount = 0;
            for (Integer i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2,null,msg.getBytes());
                //等待确认
                outstandingMessageCount++;
                if (outstandingMessageCount == batchSize){
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }

            }
            if (outstandingMessageCount > 0){
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT,end-start);


        }
    }

    /**
     * 单独确认
     */
    private static void publishingMessagesIndividually() throws Exception{
        try(Connection connection = createConnection()){
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列(使用内置交换机)
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1,true,false,false,null);
            long start = System.currentTimeMillis();
            //4. 发送消息并等待确认
            for (Integer i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1,null,msg.getBytes());
                //等待确认
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT,end-start);


        }
    }
}
